12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841 |
- # -*- coding: utf-8 -*-
- '''
- Tests for the file state
- '''
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import errno
- import logging
- import os
- import re
- import sys
- import shutil
- import stat
- import tempfile
- import textwrap
- import filecmp
- log = logging.getLogger(__name__)
- # Import Salt Testing libs
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.case import ModuleCase
- from tests.support.unit import skipIf
- from tests.support.helpers import (
- with_system_user_and_group,
- with_tempdir,
- with_tempfile,
- Webserver,
- dedent,
- )
- from tests.support.mixins import SaltReturnAssertsMixin
- # Import Salt libs
- import salt.utils.data
- import salt.utils.files
- import salt.utils.json
- import salt.utils.path
- import salt.utils.platform
- import salt.utils.stringutils
- import salt.serializers.configparser
- from salt.utils.versions import LooseVersion as _LooseVersion
- HAS_PWD = True
- try:
- import pwd
- except ImportError:
- HAS_PWD = False
- HAS_GRP = True
- try:
- import grp
- except ImportError:
- HAS_GRP = False
- # Import 3rd-party libs
- import pytest
- from salt.ext import six
- from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
- IS_WINDOWS = salt.utils.platform.is_windows()
- BINARY_FILE = b'GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;'
- TEST_SYSTEM_USER = 'test_system_user'
- TEST_SYSTEM_GROUP = 'test_system_group'
- def _test_managed_file_mode_keep_helper(testcase, local=False):
- '''
- DRY helper function to run the same test with a local or remote path
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'scene33')
- grail_fs_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene33')
- grail = 'salt://grail/scene33' if not local else grail_fs_path
- # Get the current mode so that we can put the file back the way we
- # found it when we're done.
- grail_fs_mode = int(testcase.run_function('file.get_mode', [grail_fs_path]), 8)
- initial_mode = 0o770
- new_mode_1 = 0o600
- new_mode_2 = 0o644
- # Set the initial mode, so we can be assured that when we set the mode
- # to "keep", we're actually changing the permissions of the file to the
- # new mode.
- ret = testcase.run_state(
- 'file.managed',
- name=name,
- mode=oct(initial_mode),
- source=grail,
- )
- if IS_WINDOWS:
- testcase.assertSaltFalseReturn(ret)
- return
- testcase.assertSaltTrueReturn(ret)
- try:
- # Update the mode on the fileserver (pass 1)
- os.chmod(grail_fs_path, new_mode_1)
- ret = testcase.run_state(
- 'file.managed',
- name=name,
- mode='keep',
- source=grail,
- )
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(os.stat(name).st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
- # Update the mode on the fileserver (pass 2)
- # This assures us that if the file in file_roots was originally set
- # to the same mode as new_mode_1, we definitely get an updated mode
- # this time.
- os.chmod(grail_fs_path, new_mode_2)
- ret = testcase.run_state(
- 'file.managed',
- name=name,
- mode='keep',
- source=grail,
- )
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(os.stat(name).st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
- except Exception:
- raise
- finally:
- # Set the mode of the file in the file_roots back to what it
- # originally was.
- os.chmod(grail_fs_path, grail_fs_mode)
- @pytest.mark.windows_whitelisted
- class FileTest(ModuleCase, SaltReturnAssertsMixin):
- '''
- Validate the file state
- '''
- def _delete_file(self, path):
- try:
- os.remove(path)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- log.error('Failed to remove %s: %s', path, exc)
- def tearDown(self):
- '''
- remove files created in previous tests
- '''
- user = 'salt'
- if user in str(self.run_function('user.list_users')):
- self.run_function('user.delete', [user])
- def test_symlink(self):
- '''
- file.symlink
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'symlink')
- tgt = os.path.join(RUNTIME_VARS.TMP, 'target')
- # Windows must have a source directory to link to
- if IS_WINDOWS and not os.path.isdir(tgt):
- os.mkdir(tgt)
- # Windows cannot create a symlink if it already exists
- if IS_WINDOWS and self.run_function('file.is_link', [name]):
- self.run_function('file.remove', [name])
- ret = self.run_state('file.symlink', name=name, target=tgt)
- self.assertSaltTrueReturn(ret)
- def test_test_symlink(self):
- '''
- file.symlink test interface
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'symlink2')
- tgt = os.path.join(RUNTIME_VARS.TMP, 'target')
- ret = self.run_state('file.symlink', test=True, name=name, target=tgt)
- self.assertSaltNoneReturn(ret)
- def test_absent_file(self):
- '''
- file.absent
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'file_to_kill')
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('killme')
- ret = self.run_state('file.absent', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.isfile(name))
- def test_absent_dir(self):
- '''
- file.absent
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'dir_to_kill')
- if not os.path.isdir(name):
- # left behind... Don't fail because of this!
- os.makedirs(name)
- ret = self.run_state('file.absent', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.isdir(name))
- def test_absent_link(self):
- '''
- file.absent
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'link_to_kill')
- tgt = '{0}.tgt'.format(name)
- # Windows must have a source directory to link to
- if IS_WINDOWS and not os.path.isdir(tgt):
- os.mkdir(tgt)
- if not self.run_function('file.is_link', [name]):
- self.run_function('file.symlink', [tgt, name])
- ret = self.run_state('file.absent', name=name)
- try:
- self.assertSaltTrueReturn(ret)
- self.assertFalse(self.run_function('file.is_link', [name]))
- finally:
- if self.run_function('file.is_link', [name]):
- self.run_function('file.remove', [name])
- @with_tempfile()
- def test_test_absent(self, name):
- '''
- file.absent test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('killme')
- ret = self.run_state('file.absent', test=True, name=name)
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.isfile(name))
- def test_managed(self):
- '''
- file.managed
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://grail/scene33'
- )
- src = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene33')
- with salt.utils.files.fopen(src, 'r') as fp_:
- master_data = fp_.read()
- with salt.utils.files.fopen(name, 'r') as fp_:
- minion_data = fp_.read()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode(self):
- '''
- file.managed, correct file permissions
- '''
- desired_mode = 504 # 0770 octal
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, mode='0770', source='salt://grail/scene33'
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- @skipIf(IS_WINDOWS, 'Windows does not report any file modes. Skipping.')
- def test_managed_file_mode_keep(self):
- '''
- Test using "mode: keep" in a file.managed state
- '''
- _test_managed_file_mode_keep_helper(self, local=False)
- @skipIf(IS_WINDOWS, 'Windows does not report any file modes. Skipping.')
- def test_managed_file_mode_keep_local_source(self):
- '''
- Test using "mode: keep" in a file.managed state, with a local file path
- as the source.
- '''
- _test_managed_file_mode_keep_helper(self, local=True)
- def test_managed_file_mode_file_exists_replace(self):
- '''
- file.managed, existing file with replace=True, change permissions
- '''
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, mode=oct(initial_mode), source='salt://grail/scene33'
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(initial_mode), oct(resulting_mode))
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, replace=True, mode=oct(desired_mode), source='salt://grail/scene33'
- )
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode_file_exists_noreplace(self):
- '''
- file.managed, existing file with replace=False, change permissions
- '''
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, replace=True, mode=oct(initial_mode), source='salt://grail/scene33'
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- ret = self.run_state(
- 'file.managed', name=name, replace=False, mode=oct(desired_mode), source='salt://grail/scene33'
- )
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_with_grains_data(self):
- '''
- Test to ensure we can render grains data into a managed
- file.
- '''
- grain_path = os.path.join(RUNTIME_VARS.TMP, 'file-grain-test')
- state_file = 'file-grainget'
- self.run_function('state.sls', [state_file], pillar={'grain_path': grain_path})
- self.assertTrue(os.path.exists(grain_path))
- with salt.utils.files.fopen(grain_path, 'r') as fp_:
- file_contents = fp_.readlines()
- if IS_WINDOWS:
- match = '^minion\r\n'
- else:
- match = '^minion\n'
- self.assertTrue(re.match(match, file_contents[0]))
- def test_managed_file_with_pillar_sls(self):
- '''
- Test to ensure pillar data in sls file
- is rendered properly and file is created.
- '''
- file_pillar = os.path.join(RUNTIME_VARS.TMP, 'filepillar-python')
- self.addCleanup(self._delete_file, file_pillar)
- state_name = 'file-pillarget'
- log.warning('File Path: %s', file_pillar)
- ret = self.run_function('state.sls', [state_name])
- self.assertSaltTrueReturn(ret)
- # Check to make sure the file was created
- check_file = self.run_function('file.file_exists', [file_pillar])
- self.assertTrue(check_file)
- def test_managed_file_with_pillardefault_sls(self):
- '''
- Test to ensure when pillar data is not available
- in sls file with pillar.get it uses the default
- value.
- '''
- file_pillar_def = os.path.join(RUNTIME_VARS.TMP, 'filepillar-defaultvalue')
- self.addCleanup(self._delete_file, file_pillar_def)
- state_name = 'file-pillardefaultget'
- log.warning('File Path: %s', file_pillar_def)
- ret = self.run_function('state.sls', [state_name])
- self.assertSaltTrueReturn(ret)
- # Check to make sure the file was created
- check_file = self.run_function('file.file_exists', [file_pillar_def])
- self.assertTrue(check_file)
- @pytest.mark.skip_if_not_root
- def test_managed_dir_mode(self):
- '''
- Tests to ensure that file.managed creates directories with the
- permissions requested with the dir_mode argument
- '''
- desired_mode = 511 # 0777 in octal
- name = os.path.join(RUNTIME_VARS.TMP, 'a', 'managed_dir_mode_test_file')
- desired_owner = 'nobody'
- ret = self.run_state(
- 'file.managed',
- name=name,
- source='salt://grail/scene33',
- mode=600,
- makedirs=True,
- user=desired_owner,
- dir_mode=oct(desired_mode) # 0777
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(
- os.stat(os.path.join(RUNTIME_VARS.TMP, 'a')).st_mode
- )
- resulting_owner = pwd.getpwuid(os.stat(os.path.join(RUNTIME_VARS.TMP, 'a')).st_uid).pw_name
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired_owner, resulting_owner)
- def test_test_managed(self):
- '''
- file.managed test interface
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_not_not_scene33')
- ret = self.run_state(
- 'file.managed', test=True, name=name, source='salt://grail/scene33'
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(name))
- def test_managed_show_changes_false(self):
- '''
- file.managed test interface
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_not_scene33')
- with salt.utils.files.fopen(name, 'wb') as fp_:
- fp_.write(b'test_managed_show_changes_false\n')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://grail/scene33',
- show_changes=False
- )
- changes = next(six.itervalues(ret))['changes']
- self.assertEqual('<show_changes=False>', changes['diff'])
- def test_managed_show_changes_true(self):
- '''
- file.managed test interface
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'grail_not_scene33')
- with salt.utils.files.fopen(name, 'wb') as fp_:
- fp_.write(b'test_managed_show_changes_false\n')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://grail/scene33',
- )
- changes = next(six.itervalues(ret))['changes']
- self.assertIn('diff', changes)
- @skipIf(IS_WINDOWS, 'Don\'t know how to fix for Windows')
- def test_managed_escaped_file_path(self):
- '''
- file.managed test that 'salt://|' protects unusual characters in file path
- '''
- funny_file = salt.utils.files.mkstemp(prefix='?f!le? n@=3&', suffix='.file type')
- funny_file_name = os.path.split(funny_file)[1]
- funny_url = 'salt://|' + funny_file_name
- funny_url_path = os.path.join(RUNTIME_VARS.BASE_FILES, funny_file_name)
- state_name = 'funny_file'
- state_file_name = state_name + '.sls'
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_file_name)
- state_key = 'file_|-{0}_|-{0}_|-managed'.format(funny_file)
- self.addCleanup(os.remove, state_file)
- self.addCleanup(os.remove, funny_file)
- self.addCleanup(os.remove, funny_url_path)
- with salt.utils.files.fopen(funny_url_path, 'w'):
- pass
- with salt.utils.files.fopen(state_file, 'w') as fp_:
- fp_.write(textwrap.dedent('''\
- {0}:
- file.managed:
- - source: {1}
- - makedirs: True
- '''.format(funny_file, funny_url)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(ret[state_key]['result'])
- def test_managed_contents(self):
- '''
- test file.managed with contents that is a boolean, string, integer,
- float, list, and dictionary
- '''
- state_name = 'file-FileTest-test_managed_contents'
- state_filename = state_name + '.sls'
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- managed_files = {}
- state_keys = {}
- for typ in ('bool', 'str', 'int', 'float', 'list', 'dict'):
- managed_files[typ] = salt.utils.files.mkstemp()
- state_keys[typ] = 'file_|-{0} file_|-{1}_|-managed'.format(typ, managed_files[typ])
- try:
- with salt.utils.files.fopen(state_file, 'w') as fd_:
- fd_.write(textwrap.dedent('''\
- bool file:
- file.managed:
- - name: {bool}
- - contents: True
- str file:
- file.managed:
- - name: {str}
- - contents: Salt was here.
- int file:
- file.managed:
- - name: {int}
- - contents: 340282366920938463463374607431768211456
- float file:
- file.managed:
- - name: {float}
- - contents: 1.7518e-45 # gravitational coupling constant
- list file:
- file.managed:
- - name: {list}
- - contents: [1, 1, 2, 3, 5, 8, 13]
- dict file:
- file.managed:
- - name: {dict}
- - contents:
- C: charge
- P: parity
- T: time
- '''.format(**managed_files)))
- ret = self.run_function('state.sls', [state_name])
- self.assertSaltTrueReturn(ret)
- for typ in state_keys:
- self.assertTrue(ret[state_keys[typ]]['result'])
- self.assertIn('diff', ret[state_keys[typ]]['changes'])
- finally:
- if os.path.exists(state_file):
- os.remove(state_file)
- for typ in managed_files:
- if os.path.exists(managed_files[typ]):
- os.remove(managed_files[typ])
- def test_managed_contents_with_contents_newline(self):
- '''
- test file.managed with contents by using the default content_newline
- flag.
- '''
- contents = 'test_managed_contents_with_newline_one'
- name = os.path.join(RUNTIME_VARS.TMP, 'foo')
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state('file.managed', name=name, contents=contents,
- contents_newline=True)
- with salt.utils.files.fopen(name, 'r') as fp_:
- last_line = fp_.read()
- self.assertEqual((contents + os.linesep), last_line)
- def test_managed_contents_with_contents_newline_false(self):
- '''
- test file.managed with contents by using the non default content_newline
- flag.
- '''
- contents = 'test_managed_contents_with_newline_one'
- name = os.path.join(RUNTIME_VARS.TMP, 'bar')
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state('file.managed', name=name, contents=contents,
- contents_newline=False)
- with salt.utils.files.fopen(name, 'r') as fp_:
- last_line = fp_.read()
- self.assertEqual(contents, last_line)
- def test_managed_multiline_contents_with_contents_newline(self):
- '''
- test file.managed with contents by using the non default content_newline
- flag.
- '''
- contents = ('this is a cookie{}this is another cookie'.
- format(os.linesep))
- name = os.path.join(RUNTIME_VARS.TMP, 'bar')
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state('file.managed', name=name, contents=contents,
- contents_newline=True)
- with salt.utils.files.fopen(name, 'r') as fp_:
- last_line = fp_.read()
- self.assertEqual((contents + os.linesep), last_line)
- def test_managed_multiline_contents_with_contents_newline_false(self):
- '''
- test file.managed with contents by using the non default content_newline
- flag.
- '''
- contents = ('this is a cookie{}this is another cookie'.
- format(os.linesep))
- name = os.path.join(RUNTIME_VARS.TMP, 'bar')
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state('file.managed', name=name, contents=contents,
- contents_newline=False)
- with salt.utils.files.fopen(name, 'r') as fp_:
- last_line = fp_.read()
- self.assertEqual(contents, last_line)
- @pytest.mark.skip_if_not_root
- @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
- @skipIf(not salt.utils.path.which('visudo'), 'sudo is missing')
- def test_managed_check_cmd(self):
- '''
- Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
- '''
- r_group = 'root'
- if salt.utils.platform.is_darwin():
- r_group = 'wheel'
- try:
- ret = self.run_state(
- 'file.managed',
- name='/tmp/sudoers',
- user='root',
- group=r_group,
- mode=440,
- check_cmd='visudo -c -s -f'
- )
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment('Empty file', ret)
- self.assertEqual(ret['file_|-/tmp/sudoers_|-/tmp/sudoers_|-managed']['changes'],
- {'new': 'file /tmp/sudoers created', 'mode': '0440'})
- finally:
- # Clean Up File
- if os.path.exists('/tmp/sudoers'):
- os.remove('/tmp/sudoers')
- def test_managed_local_source_with_source_hash(self):
- '''
- Make sure that we enforce the source_hash even with local files
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'local_source_with_source_hash')
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene33')
- actual_hash = '567fd840bf1548edc35c48eb66cdd78bfdfcccff'
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- actual_hash = 'f658a0ec121d9c17088795afcc6ff3c43cb9842a'
- # Reverse the actual hash
- bad_hash = actual_hash[::-1]
- def remove_file():
- try:
- os.remove(name)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise
- def do_test(clean=False):
- for proto in ('file://', ''):
- source = proto + local_path
- log.debug('Trying source %s', source)
- try:
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source,
- source_hash='sha1={0}'.format(bad_hash))
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret['changes'])
- # Check that we identified a hash mismatch
- self.assertIn(
- 'does not match actual checksum', ret['comment'])
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source,
- source_hash='sha1={0}'.format(actual_hash))
- self.assertSaltTrueReturn(ret)
- finally:
- if clean:
- remove_file()
- remove_file()
- log.debug('Trying with nonexistant destination file')
- do_test()
- log.debug('Trying with destination file already present')
- with salt.utils.files.fopen(name, 'w'):
- pass
- try:
- do_test(clean=False)
- finally:
- remove_file()
- def test_managed_local_source_does_not_exist(self):
- '''
- Make sure that we exit gracefully when a local source doesn't exist
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'local_source_does_not_exist')
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene99')
- for proto in ('file://', ''):
- source = proto + local_path
- log.debug('Trying source %s', source)
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source)
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret['changes'])
- # Check that we identified a hash mismatch
- self.assertIn(
- 'does not exist', ret['comment'])
- def test_managed_unicode_jinja_with_tojson_filter(self):
- '''
- Using {{ varname }} with a list or dictionary which contains unicode
- types on Python 2 will result in Jinja rendering the "u" prefix on each
- string. This tests that using the "tojson" jinja filter will dump them
- to a format which can be successfully loaded by our YAML loader.
- The two lines that should end up being rendered are meant to test two
- issues that would trip up PyYAML if the "tojson" filter were not used:
- 1. A unicode string type would be loaded as a unicode literal with the
- leading "u" as well as the quotes, rather than simply being loaded
- as the proper unicode type which matches the content of the string
- literal. In other words, u'foo' would be loaded literally as
- u"u'foo'". This test includes actual non-ascii unicode in one of the
- strings to confirm that this also handles these international
- characters properly.
- 2. Any unicode string type (such as a URL) which contains a colon would
- cause a ScannerError in PyYAML, as it would be assumed to delimit a
- mapping node.
- Dumping the data structure to JSON using the "tojson" jinja filter
- should produce an inline data structure which is valid YAML and will be
- loaded properly by our YAML loader.
- '''
- test_file = os.path.join(RUNTIME_VARS.TMP, 'test-tojson.txt')
- ret = self.run_function(
- 'state.apply',
- mods='tojson',
- pillar={'tojson-file': test_file})
- ret = ret[next(iter(ret))]
- assert ret['result'], ret
- with salt.utils.files.fopen(test_file, mode='rb') as fp_:
- managed = salt.utils.stringutils.to_unicode(fp_.read())
- expected = dedent('''\
- Die Webseite ist https://saltstack.com.
- Der Zucker ist süß.
- ''')
- assert managed == expected, '{0!r} != {1!r}'.format(managed, expected) # pylint: disable=repr-flag-used-in-string
- def test_managed_source_hash_indifferent_case(self):
- '''
- Test passing a source_hash as an uppercase hash.
- This is a regression test for Issue #38914 and Issue #48230 (test=true use).
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'source_hash_indifferent_case')
- state_name = 'file_|-{0}_|' \
- '-{0}_|-managed'.format(name)
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'hello_world.txt')
- actual_hash = 'c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31'
- if IS_WINDOWS:
- # CRLF vs LF causes a differnt hash on windows
- actual_hash = '92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925'
- uppercase_hash = actual_hash.upper()
- try:
- # Lay down tmp file to test against
- self.run_state(
- 'file.managed',
- name=name,
- source=local_path,
- source_hash=actual_hash
- )
- # Test uppercase source_hash: should return True with no changes
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=local_path,
- source_hash=uppercase_hash
- )
- assert ret[state_name]['result'] is True
- assert ret[state_name]['changes'] == {}
- # Test uppercase source_hash using test=true
- # Should return True with no changes
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=local_path,
- source_hash=uppercase_hash,
- test=True
- )
- assert ret[state_name]['result'] is True
- assert ret[state_name]['changes'] == {}
- finally:
- # Clean Up File
- if os.path.exists(name):
- os.remove(name)
- @with_tempfile(create=False)
- def test_managed_latin1_diff(self, name):
- '''
- Tests that latin-1 file contents are represented properly in the diff
- '''
- # Lay down the initial file
- ret = self.run_state(
- 'file.managed',
- name=name,
- source='salt://issue-48777/old.html')
- ret = ret[next(iter(ret))]
- assert ret['result'] is True, ret
- # Replace it with the new file and check the diff
- ret = self.run_state(
- 'file.managed',
- name=name,
- source='salt://issue-48777/new.html')
- ret = ret[next(iter(ret))]
- assert ret['result'] is True, ret
- diff_lines = ret['changes']['diff'].split(os.linesep)
- assert '+räksmörgås' in diff_lines, diff_lines
- @with_tempfile()
- def test_managed_keep_source_false_salt(self, name):
- '''
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using a salt:// URL
- '''
- source = 'salt://grail/scene33'
- saltenv = 'base'
- # Run the state
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source,
- saltenv=saltenv,
- keep_source=False)
- ret = ret[next(iter(ret))]
- assert ret['result'] is True
- # Now make sure that the file is not cached
- result = self.run_function('cp.is_cached', [source, saltenv])
- assert result == '', 'File is still cached at {0}'.format(result)
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_onchanges(self, file1, file2):
- '''
- Test file.managed state with onchanges
- '''
- pillar = {'file1': file1,
- 'file2': file2,
- 'source': 'salt://testfile',
- 'req': 'onchanges'}
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state(
- 'file.managed',
- name=pillar['file2'],
- source=pillar['source'])
- ret = self.repack_state_returns(
- self.run_function(
- 'state.apply',
- mods='onchanges_prereq',
- pillar=pillar,
- test=True,
- )
- )
- # The file states should both exit with None
- assert ret['one']['result'] is None, ret['one']['result']
- assert ret['three']['result'] is True, ret['three']['result']
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret['one']['changes']
- assert not ret['three']['changes'], ret['three']['changes']
- # The state watching 'one' should have been run due to changes
- assert ret['two']['comment'] == 'Success!', ret['two']['comment']
- # The state watching 'three' should not have been run
- assert ret['four']['comment'] == \
- 'State was not run because none of the onchanges reqs changed', \
- ret['four']['comment']
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_prereq(self, file1, file2):
- '''
- Test file.managed state with prereq
- '''
- pillar = {'file1': file1,
- 'file2': file2,
- 'source': 'salt://testfile',
- 'req': 'prereq'}
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state(
- 'file.managed',
- name=pillar['file2'],
- source=pillar['source'])
- ret = self.repack_state_returns(
- self.run_function(
- 'state.apply',
- mods='onchanges_prereq',
- pillar=pillar,
- test=True,
- )
- )
- # The file states should both exit with None
- assert ret['one']['result'] is None, ret['one']['result']
- assert ret['three']['result'] is True, ret['three']['result']
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret['one']['changes']
- assert not ret['three']['changes'], ret['three']['changes']
- # The state watching 'one' should have been run due to changes
- assert ret['two']['comment'] == 'Success!', ret['two']['comment']
- # The state watching 'three' should not have been run
- assert ret['four']['comment'] == 'No changes detected', \
- ret['four']['comment']
- def test_directory(self):
- '''
- file.directory
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'a_new_dir')
- ret = self.run_state('file.directory', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isdir(name))
- def test_directory_symlink_dry_run(self):
- '''
- Ensure that symlinks are followed when file.directory is run with
- test=True
- '''
- try:
- tmp_dir = os.path.join(RUNTIME_VARS.TMP, 'pgdata')
- sym_dir = os.path.join(RUNTIME_VARS.TMP, 'pg_data')
- os.mkdir(tmp_dir, 0o700)
- self.run_function('file.symlink', [tmp_dir, sym_dir])
- if IS_WINDOWS:
- ret = self.run_state(
- 'file.directory', test=True, name=sym_dir,
- follow_symlinks=True, win_owner='Administrators')
- else:
- ret = self.run_state(
- 'file.directory', test=True, name=sym_dir,
- follow_symlinks=True, mode=700)
- self.assertSaltTrueReturn(ret)
- finally:
- if os.path.isdir(tmp_dir):
- self.run_function('file.remove', [tmp_dir])
- if os.path.islink(sym_dir):
- self.run_function('file.remove', [sym_dir])
- @pytest.mark.skip_if_not_root
- @skipIf(IS_WINDOWS, 'Mode not available in Windows')
- def test_directory_max_depth(self):
- '''
- file.directory
- Test the max_depth option by iteratively increasing the depth and
- checking that no changes deeper than max_depth have been attempted
- '''
- def _get_oct_mode(name):
- '''
- Return a string octal representation of the permissions for name
- '''
- return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
- top = os.path.join(RUNTIME_VARS.TMP, 'top_dir')
- sub = os.path.join(top, 'sub_dir')
- subsub = os.path.join(sub, 'sub_sub_dir')
- dirs = [top, sub, subsub]
- initial_mode = '0111'
- changed_mode = '0555'
- initial_modes = {0: {sub: '0755',
- subsub: '0111'},
- 1: {sub: '0111',
- subsub: '0111'},
- 2: {sub: '0111',
- subsub: '0111'}}
- if not os.path.isdir(subsub):
- os.makedirs(subsub, int(initial_mode, 8))
- try:
- for depth in range(0, 3):
- ret = self.run_state('file.directory',
- name=top,
- max_depth=depth,
- dir_mode=changed_mode,
- recurse=['mode'])
- self.assertSaltTrueReturn(ret)
- for changed_dir in dirs[0:depth+1]:
- self.assertEqual(changed_mode,
- _get_oct_mode(changed_dir))
- for untouched_dir in dirs[depth+1:]:
- # Beginning in Python 3.7, os.makedirs no longer sets
- # the mode of intermediate directories to the mode that
- # is passed.
- if sys.version_info >= (3, 7):
- _mode = initial_modes[depth][untouched_dir]
- self.assertEqual(_mode,
- _get_oct_mode(untouched_dir))
- else:
- self.assertEqual(initial_mode,
- _get_oct_mode(untouched_dir))
- finally:
- shutil.rmtree(top)
- def test_test_directory(self):
- '''
- file.directory
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'a_not_dir')
- ret = self.run_state('file.directory', test=True, name=name)
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isdir(name))
- @with_tempdir()
- def test_directory_clean(self, base_dir):
- '''
- file.directory with clean=True
- '''
- name = os.path.join(base_dir, 'directory_clean_dir')
- os.mkdir(name)
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- straydir = os.path.join(name, 'straydir')
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- with salt.utils.files.fopen(os.path.join(straydir, 'strayfile2'), 'w'):
- pass
- ret = self.run_state('file.directory', name=name, clean=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(straydir))
- self.assertTrue(os.path.isdir(name))
- def test_directory_is_idempotent(self):
- '''
- Ensure the file.directory state produces no changes when rerun.
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'a_dir_twice')
- if IS_WINDOWS:
- username = os.environ.get('USERNAME', 'Administrators')
- domain = os.environ.get('USERDOMAIN', '')
- fullname = '{0}\\{1}'.format(domain, username)
- ret = self.run_state('file.directory', name=name, win_owner=fullname)
- else:
- ret = self.run_state('file.directory', name=name)
- self.assertSaltTrueReturn(ret)
- if IS_WINDOWS:
- ret = self.run_state('file.directory', name=name, win_owner=username)
- else:
- ret = self.run_state('file.directory', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertSaltStateChangesEqual(ret, {})
- @with_tempdir()
- def test_directory_clean_exclude(self, base_dir):
- '''
- file.directory with clean=True and exclude_pat set
- '''
- name = os.path.join(base_dir, 'directory_clean_dir')
- if not os.path.isdir(name):
- os.makedirs(name)
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- straydir = os.path.join(name, 'straydir')
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, 'strayfile2')
- with salt.utils.files.fopen(strayfile2, 'w'):
- pass
- keepfile = os.path.join(straydir, 'keepfile')
- with salt.utils.files.fopen(keepfile, 'w'):
- pass
- exclude_pat = 'E@^straydir(|/keepfile)$'
- if IS_WINDOWS:
- exclude_pat = 'E@^straydir(|\\\\keepfile)$'
- ret = self.run_state('file.directory',
- name=name,
- clean=True,
- exclude_pat=exclude_pat)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- @skipIf(IS_WINDOWS, 'Skip on windows')
- @with_tempdir()
- def test_test_directory_clean_exclude(self, base_dir):
- '''
- file.directory with test=True, clean=True and exclude_pat set
- Skipped on windows because clean and exclude_pat not supported by
- salt.sates.file._check_directory_win
- '''
- name = os.path.join(base_dir, 'directory_clean_dir')
- os.mkdir(name)
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- straydir = os.path.join(name, 'straydir')
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, 'strayfile2')
- with salt.utils.files.fopen(strayfile2, 'w'):
- pass
- keepfile = os.path.join(straydir, 'keepfile')
- with salt.utils.files.fopen(keepfile, 'w'):
- pass
- exclude_pat = 'E@^straydir(|/keepfile)$'
- if IS_WINDOWS:
- exclude_pat = 'E@^straydir(|\\\\keepfile)$'
- ret = self.run_state('file.directory',
- test=True,
- name=name,
- clean=True,
- exclude_pat=exclude_pat)
- comment = next(six.itervalues(ret))['comment']
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.exists(strayfile))
- self.assertTrue(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- self.assertIn(strayfile, comment)
- self.assertIn(strayfile2, comment)
- self.assertNotIn(keepfile, comment)
- @with_tempdir()
- def test_directory_clean_require_in(self, name):
- '''
- file.directory test with clean=True and require_in file
- '''
- state_name = 'file-FileTest-test_directory_clean_require_in'
- state_filename = state_name + '.sls'
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, 'w') as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(textwrap.dedent('''\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- {good_file}:
- file.managed:
- - require_in:
- - file: some_dir
- '''.format(name=name, good_file=good_file)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @with_tempdir()
- def test_directory_clean_require_in_with_id(self, name):
- '''
- file.directory test with clean=True and require_in file with an ID
- different from the file name
- '''
- state_name = 'file-FileTest-test_directory_clean_require_in_with_id'
- state_filename = state_name + '.sls'
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, 'w') as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(textwrap.dedent('''\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- some_file:
- file.managed:
- - name: {good_file}
- - require_in:
- - file: some_dir
- '''.format(name=name, good_file=good_file)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @skipIf(salt.utils.platform.is_darwin(), 'WAR ROOM TEMPORARY SKIP, Test is flaky on macosx')
- @with_tempdir()
- def test_directory_clean_require_with_name(self, name):
- '''
- file.directory test with clean=True and require with a file state
- relatively to the state's name, not its ID.
- '''
- state_name = 'file-FileTest-test_directory_clean_require_in_with_id'
- state_filename = state_name + '.sls'
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, 'w') as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(textwrap.dedent('''\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- - require:
- # This requirement refers to the name of the following
- # state, not its ID.
- - file: {good_file}
- some_file:
- file.managed:
- - name: {good_file}
- '''.format(name=name, good_file=good_file)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- def test_directory_broken_symlink(self):
- '''
- Ensure that file.directory works even if a directory
- contains broken symbolic link
- '''
- try:
- tmp_dir = os.path.join(RUNTIME_VARS.TMP, 'foo')
- null_file = '{0}/null'.format(tmp_dir)
- broken_link = '{0}/broken'.format(tmp_dir)
- os.mkdir(tmp_dir, 0o700)
- self.run_function('file.symlink', [null_file, broken_link])
- if IS_WINDOWS:
- ret = self.run_state(
- 'file.directory', name=tmp_dir, recurse=['mode'],
- follow_symlinks=True, win_owner='Administrators')
- else:
- ret = self.run_state(
- 'file.directory', name=tmp_dir, recurse=['mode'],
- file_mode=644, dir_mode=755)
- self.assertSaltTrueReturn(ret)
- finally:
- if os.path.isdir(tmp_dir):
- self.run_function('file.remove', [tmp_dir])
- @with_tempdir(create=False)
- def test_recurse(self, name):
- '''
- file.recurse
- '''
- ret = self.run_state('file.recurse', name=name, source='salt://grail')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(name, '36', 'scene')))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env(self, dir1, dir2):
- '''
- file.recurse passing __env__
- '''
- ret = self.run_state('file.recurse',
- name=dir1,
- source='salt://holy',
- __env__='prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, '32', 'scene')))
- ret = self.run_state('file.recurse',
- name=dir2,
- source='salt://holy',
- saltenv='prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, '32', 'scene')))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env_in_url(self, dir1, dir2):
- '''
- file.recurse passing __env__
- '''
- ret = self.run_state('file.recurse',
- name=dir1,
- source='salt://holy?saltenv=prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, '32', 'scene')))
- ret = self.run_state('file.recurse',
- name=dir2,
- source='salt://holy?saltenv=prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, '32', 'scene')))
- @with_tempdir(create=False)
- def test_test_recurse(self, name):
- '''
- file.recurse test interface
- '''
- ret = self.run_state(
- 'file.recurse', test=True, name=name, source='salt://grail',
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(name, '36', 'scene')))
- self.assertFalse(os.path.exists(name))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_test_recurse_specific_env(self, dir1, dir2):
- '''
- file.recurse test interface
- '''
- ret = self.run_state('file.recurse',
- test=True,
- name=dir1,
- source='salt://holy',
- __env__='prod'
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir1, '32', 'scene')))
- self.assertFalse(os.path.exists(dir1))
- ret = self.run_state('file.recurse',
- test=True,
- name=dir2,
- source='salt://holy',
- saltenv='prod'
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir2, '32', 'scene')))
- self.assertFalse(os.path.exists(dir2))
- @with_tempdir(create=False)
- def test_recurse_template(self, name):
- '''
- file.recurse with jinja template enabled
- '''
- _ts = 'TEMPLATE TEST STRING'
- ret = self.run_state(
- 'file.recurse', name=name, source='salt://grail',
- template='jinja', defaults={'spam': _ts})
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(os.path.join(name, 'scene33'), 'r') as fp_:
- contents = fp_.read()
- self.assertIn(_ts, contents)
- @with_tempdir()
- def test_recurse_clean(self, name):
- '''
- file.recurse with clean=True
- '''
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, '36'), 'w'):
- pass
- os.makedirs(os.path.join(name, 'scene33'))
- ret = self.run_state(
- 'file.recurse', name=name, source='salt://grail', clean=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, '36', 'scene')))
- self.assertTrue(os.path.isfile(os.path.join(name, 'scene33')))
- @with_tempdir()
- def test_recurse_clean_specific_env(self, name):
- '''
- file.recurse with clean=True and __env__=prod
- '''
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, '32'), 'w'):
- pass
- os.makedirs(os.path.join(name, 'scene34'))
- ret = self.run_state('file.recurse',
- name=name,
- source='salt://holy',
- clean=True,
- __env__='prod')
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, '32', 'scene')))
- self.assertTrue(os.path.isfile(os.path.join(name, 'scene34')))
- @skipIf(IS_WINDOWS, 'Skip on windows')
- @with_tempdir()
- def test_recurse_issue_34945(self, base_dir):
- '''
- This tests the case where the source dir for the file.recurse state
- does not contain any files (only subdirectories), and the dir_mode is
- being managed. For a long time, this corner case resulted in the top
- level of the destination directory being created with the wrong initial
- permissions, a problem that would be corrected later on in the
- file.recurse state via running state.directory. However, the
- file.directory state only gets called when there are files to be
- managed in that directory, and when the source directory contains only
- subdirectories, the incorrectly-set initial perms would not be
- repaired.
- This was fixed in https://github.com/saltstack/salt/pull/35309
- Skipped on windows because dir_mode is not supported.
- '''
- dir_mode = '2775'
- issue_dir = 'issue-34945'
- name = os.path.join(base_dir, issue_dir)
- ret = self.run_state('file.recurse',
- name=name,
- source='salt://' + issue_dir,
- dir_mode=dir_mode)
- self.assertSaltTrueReturn(ret)
- actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
- self.assertEqual(dir_mode, actual_dir_mode)
- @with_tempdir(create=False)
- def test_recurse_issue_40578(self, name):
- '''
- This ensures that the state doesn't raise an exception when it
- encounters a file with a unicode filename in the process of invoking
- file.source_list.
- '''
- ret = self.run_state('file.recurse',
- name=name,
- source='salt://соль')
- self.assertSaltTrueReturn(ret)
- if six.PY2 and IS_WINDOWS:
- # Providing unicode to os.listdir so that we avoid having listdir
- # try to decode the filenames using the systemencoding on windows
- # python 2.
- files = os.listdir(name.decode('utf-8'))
- else:
- files = salt.utils.data.decode(os.listdir(name), normalize=True)
- self.assertEqual(
- sorted(files),
- sorted(['foo.txt', 'спам.txt', 'яйца.txt']),
- )
- @with_tempfile()
- def test_replace(self, name):
- '''
- file.replace
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('change_me')
- ret = self.run_state('file.replace',
- name=name, pattern='change', repl='salt', backup=False)
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertIn('salt', fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_replace_issue_18612(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains one commented line
- The commented line should be uncommented in the end, nothing else should change
- '''
- test_name = 'test_replace_issue_18612'
- path_test = os.path.join(base_dir, test_name)
- with salt.utils.files.fopen(path_test, 'w+') as fp_test_:
- fp_test_.write('# en_US.UTF-8')
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', append_if_not_found=True))
- # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
- with salt.utils.files.fopen(path_test, 'r') as fp_test_:
- self.assertTrue((sum(1 for _ in fp_test_) == 1))
- # ensure, the replacement succeeded
- with salt.utils.files.fopen(path_test, 'r') as fp_test_:
- self.assertTrue(fp_test_.read().startswith('en_US.UTF-8'))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_prepend(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be prepended to the file
- '''
- test_name = 'test_replace_issue_18612_prepend'
- path_in = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', prepend_if_not_found=True))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be appended to the file
- '''
- test_name = 'test_replace_issue_18612_append'
- path_in = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', append_if_not_found=True))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append_not_found_content(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The 'not_found_content' value should be appended to the file
- '''
- test_name = 'test_replace_issue_18612_append_not_found_content'
- path_in = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state('file.replace',
- name=path_test,
- pattern='^# en_US.UTF-8$',
- repl='en_US.UTF-8',
- append_if_not_found=True,
- not_found_content='THIS LINE WASN\'T FOUND! SO WE\'RE APPENDING IT HERE!'
- ))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains 5 key=value pairs
- The commented key=value pair #foo=bar should be changed to foo=salt
- The comment char (#) in front of foo=bar should be removed
- '''
- test_name = 'test_replace_issue_18612_change_mid_line_with_comment'
- path_in = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^#foo=bar($|(?=\r\n))', repl='foo=salt', append_if_not_found=True))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all 'file.replace' runs reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18841_no_changes(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created
- '''
- test_name = 'test_replace_issue_18841_no_changes'
- path_in = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5*24*60*60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime-age, fstats_orig.st_atime-age))
- ret = self.run_state('file.replace',
- name=path_test,
- pattern='^hello world$',
- repl='goodbye world',
- show_changes=True,
- flags=['IGNORECASE'],
- backup=False
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + '.bak'))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime-age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- def test_serialize(self):
- '''
- Test to ensure that file.serialize returns a data structure that's
- both serialized and formatted properly
- '''
- path_test = os.path.join(RUNTIME_VARS.TMP, 'test_serialize')
- ret = self.run_state('file.serialize',
- name=path_test,
- dataset={'name': 'naive',
- 'description': 'A basic test',
- 'a_list': ['first_element', 'second_element'],
- 'finally': 'the last item'},
- formatter='json')
- with salt.utils.files.fopen(path_test, 'rb') as fp_:
- serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
- # The JSON serializer uses LF even on OSes where os.sep is CRLF.
- expected_file = '\n'.join([
- '{',
- ' "a_list": [',
- ' "first_element",',
- ' "second_element"',
- ' ],',
- ' "description": "A basic test",',
- ' "finally": "the last item",',
- ' "name": "naive"',
- '}',
- '',
- ])
- self.assertEqual(serialized_file, expected_file)
- @with_tempfile(create=False)
- def test_serializer_deserializer_opts(self, name):
- '''
- Test the serializer_opts and deserializer_opts options
- '''
- data1 = {'foo': {'bar': '%(x)s'}}
- data2 = {'foo': {'abc': 123}}
- merged = {'foo': {'y': 'not_used', 'x': 'baz', 'abc': 123, 'bar': u'baz'}}
- ret = self.run_state(
- 'file.serialize',
- name=name,
- dataset=data1,
- formatter='configparser',
- deserializer_opts=[{'defaults': {'y': 'not_used'}}])
- ret = ret[next(iter(ret))]
- assert ret['result'], ret
- # We should have warned about deserializer_opts being used when
- # merge_if_exists was not set to True.
- assert 'warnings' in ret
- # Run with merge_if_exists, as well as serializer and deserializer opts
- # deserializer opts will be used for string interpolation of the %(x)s
- # that was written to the file with data1 (i.e. bar should become baz)
- ret = self.run_state(
- 'file.serialize',
- name=name,
- dataset=data2,
- formatter='configparser',
- merge_if_exists=True,
- serializer_opts=[{'defaults': {'y': 'not_used'}}],
- deserializer_opts=[{'defaults': {'x': 'baz'}}])
- ret = ret[next(iter(ret))]
- assert ret['result'], ret
- with salt.utils.files.fopen(name) as fp_:
- serialized_data = salt.serializers.configparser.deserialize(fp_)
- # If this test fails, this debug logging will help tell us how the
- # serialized data differs from what was serialized.
- log.debug('serialized_data = %r', serialized_data)
- log.debug('merged = %r', merged)
- # serializing with a default of 'y' will add y = not_used into foo
- assert serialized_data['foo']['y'] == merged['foo']['y']
- # deserializing with default of x = baz will perform interpolation on %(x)s
- # and bar will then = baz
- assert serialized_data['foo']['bar'] == merged['foo']['bar']
- @with_tempdir()
- def test_replace_issue_18841_omit_backup(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created, although backup=False isn't explicitly defined
- '''
- test_name = 'test_replace_issue_18841_omit_backup'
- path_in = os.path.join(
- RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5*24*60*60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime-age, fstats_orig.st_atime-age))
- ret = self.run_state('file.replace',
- name=path_test,
- pattern='^hello world$',
- repl='goodbye world',
- show_changes=True,
- flags=['IGNORECASE']
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + '.bak'))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime-age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_comment(self, name):
- '''
- file.comment
- '''
- # write a line to file
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('comment_me')
- # Look for changes with test=True: return should be "None" at the first run
- ret = self.run_state('file.comment', test=True, name=name, regex='^comment')
- self.assertSaltNoneReturn(ret)
- # comment once
- ret = self.run_state('file.comment', name=name, regex='^comment')
- # result is positive
- self.assertSaltTrueReturn(ret)
- # line is commented
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertTrue(fp_.read().startswith('#comment'))
- # comment twice
- ret = self.run_state('file.comment', name=name, regex='^comment')
- # result is still positive
- self.assertSaltTrueReturn(ret)
- # line is still commented
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertTrue(fp_.read().startswith('#comment'))
- # Test previously commented file returns "True" now and not "None" with test=True
- ret = self.run_state('file.comment', test=True, name=name, regex='^comment')
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_comment(self, name):
- '''
- file.comment test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('comment_me')
- ret = self.run_state(
- 'file.comment', test=True, name=name, regex='.*comment.*',
- )
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertNotIn('#comment', fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_uncomment(self, name):
- '''
- file.uncomment
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#comment_me')
- ret = self.run_state('file.uncomment', name=name, regex='^comment')
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertNotIn('#comment', fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_uncomment(self, name):
- '''
- file.comment test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#comment_me')
- ret = self.run_state(
- 'file.uncomment', test=True, name=name, regex='^comment.*'
- )
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertIn('#comment', fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_append(self, name):
- '''
- file.append
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#salty!')
- ret = self.run_state('file.append', name=name, text='cheese')
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertIn('cheese', fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_append(self, name):
- '''
- file.append test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#salty!')
- ret = self.run_state(
- 'file.append', test=True, name=name, text='cheese'
- )
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertNotIn('cheese', fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_append_issue_1864_makedirs(self, base_dir):
- '''
- file.append but create directories if needed as an option, and create
- the file if it doesn't exist
- '''
- fname = 'append_issue_1864_makedirs'
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state(
- 'file.append', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, 'issue_1864', fname)
- ret = self.run_state(
- 'file.append', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, 'issue_1864', fname + '2')
- ret = self.run_state(
- 'file.append', name=name, text='cheese'
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempdir()
- def test_prepend_issue_27401_makedirs(self, base_dir):
- '''
- file.prepend but create directories if needed as an option, and create
- the file if it doesn't exist
- '''
- fname = 'prepend_issue_27401'
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state(
- 'file.prepend', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, 'issue_27401', fname)
- ret = self.run_state(
- 'file.prepend', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, 'issue_27401', fname + '2')
- ret = self.run_state(
- 'file.prepend', name=name, text='cheese'
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempfile()
- def test_touch(self, name):
- '''
- file.touch
- '''
- ret = self.run_state('file.touch', name=name)
- self.assertTrue(os.path.isfile(name))
- self.assertSaltTrueReturn(ret)
- @with_tempfile(create=False)
- def test_test_touch(self, name):
- '''
- file.touch test interface
- '''
- ret = self.run_state('file.touch', test=True, name=name)
- self.assertFalse(os.path.isfile(name))
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_touch_directory(self, base_dir):
- '''
- file.touch a directory
- '''
- name = os.path.join(base_dir, 'touch_test_dir')
- os.mkdir(name)
- ret = self.run_state('file.touch', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isdir(name))
- @with_tempdir()
- def test_issue_2227_file_append(self, base_dir):
- '''
- Text to append includes a percent symbol
- '''
- # let's make use of existing state to create a file with contents to
- # test against
- tmp_file_append = os.path.join(base_dir, 'test.append')
- self.run_state('file.touch', name=tmp_file_append)
- self.run_state(
- 'file.append',
- name=tmp_file_append,
- source='salt://testappend/firstif')
- self.run_state(
- 'file.append',
- name=tmp_file_append,
- source='salt://testappend/secondif')
- # Now our real test
- try:
- ret = self.run_state(
- 'file.append',
- name=tmp_file_append,
- text="HISTTIMEFORMAT='%F %T '")
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, 'r') as fp_:
- contents_pre = fp_.read()
- # It should not append text again
- ret = self.run_state(
- 'file.append',
- name=tmp_file_append,
- text="HISTTIMEFORMAT='%F %T '")
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, 'r') as fp_:
- contents_post = fp_.read()
- self.assertEqual(contents_pre, contents_post)
- except AssertionError:
- if os.path.exists(tmp_file_append):
- shutil.copy(tmp_file_append, tmp_file_append + '.bak')
- raise
- @with_tempdir()
- def test_issue_2401_file_comment(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, 'issue-2041-comment.txt')
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, 'w') as fp_:
- fp_.write('hello\nworld\n')
- # create the sls template
- template_lines = [
- '{0}:'.format(tmp_file),
- ' file.comment:',
- ' - regex: ^world'
- ]
- template = '\n'.join(template_lines)
- try:
- ret = self.run_function(
- 'state.template_str', [template], timeout=120
- )
- self.assertSaltTrueReturn(ret)
- self.assertNotInSaltComment('Pattern already commented', ret)
- self.assertInSaltComment('Commented lines successfully', ret)
- # This next time, it is already commented.
- ret = self.run_function(
- 'state.template_str', [template], timeout=120
- )
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment('Pattern already commented', ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + '.bak')
- raise
- @with_tempdir()
- def test_issue_2379_file_append(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, 'issue-2379-file-append.txt')
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, 'w') as fp_:
- fp_.write(
- 'hello\nworld\n' # Some junk
- '#PermitRootLogin yes\n' # Commented text
- '# PermitRootLogin yes\n' # Commented text with space
- )
- # create the sls template
- template_lines = [
- '{0}:'.format(tmp_file),
- ' file.append:',
- ' - text: PermitRootLogin yes'
- ]
- template = '\n'.join(template_lines)
- try:
- ret = self.run_function('state.template_str', [template])
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment('Appended 1 lines', ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + '.bak')
- raise
- @skipIf(IS_WINDOWS, 'Mode not available in Windows')
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_issue_2726_mode_kwarg(self, dir1, dir2):
- # Let's test for the wrong usage approach
- bad_mode_kwarg_testfile = os.path.join(
- dir1, 'bad_mode_kwarg', 'testfile'
- )
- bad_template = [
- '{0}:'.format(bad_mode_kwarg_testfile),
- ' file.recurse:',
- ' - source: salt://testfile',
- ' - mode: 644'
- ]
- ret = self.run_function(
- 'state.template_str', [os.linesep.join(bad_template)]
- )
- self.assertSaltFalseReturn(ret)
- self.assertInSaltComment(
- '\'mode\' is not allowed in \'file.recurse\'. Please use '
- '\'file_mode\' and \'dir_mode\'.',
- ret
- )
- self.assertNotInSaltComment(
- 'TypeError: managed() got multiple values for keyword '
- 'argument \'mode\'',
- ret
- )
- # Now, the correct usage approach
- good_mode_kwargs_testfile = os.path.join(
- dir2, 'good_mode_kwargs', 'testappend'
- )
- good_template = [
- '{0}:'.format(good_mode_kwargs_testfile),
- ' file.recurse:',
- ' - source: salt://testappend',
- ' - dir_mode: 744',
- ' - file_mode: 644',
- ]
- ret = self.run_function(
- 'state.template_str', [os.linesep.join(good_template)]
- )
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_issue_8343_accumulated_require_in(self, base_dir):
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, 'issue-8343.sls')
- testcase_filedest = os.path.join(base_dir, 'issue-8343.txt')
- if os.path.exists(template_path):
- os.remove(template_path)
- if os.path.exists(testcase_filedest):
- os.remove(testcase_filedest)
- sls_template = [
- '{0}:',
- ' file.managed:',
- ' - contents: |',
- ' #',
- '',
- 'prepend-foo-accumulator-from-pillar:',
- ' file.accumulated:',
- ' - require_in:',
- ' - file: prepend-foo-management',
- ' - filename: {0}',
- ' - text: |',
- ' foo',
- '',
- 'append-foo-accumulator-from-pillar:',
- ' file.accumulated:',
- ' - require_in:',
- ' - file: append-foo-management',
- ' - filename: {0}',
- ' - text: |',
- ' bar',
- '',
- 'prepend-foo-management:',
- ' file.blockreplace:',
- ' - name: {0}',
- ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zonestart --"',
- " - content: ''",
- ' - prepend_if_not_found: True',
- " - backup: '.bak'",
- ' - show_changes: True',
- '',
- 'append-foo-management:',
- ' file.blockreplace:',
- ' - name: {0}',
- ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zoneend --"',
- " - content: ''",
- ' - append_if_not_found: True',
- " - backup: '.bak2'",
- ' - show_changes: True',
- '']
- with salt.utils.files.fopen(template_path, 'w') as fp_:
- fp_.write(
- os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function('state.sls', mods='issue-8343')
- for name, step in six.iteritems(ret):
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- expected = [
- '#-- start salt managed zonestart -- PLEASE, DO NOT EDIT',
- 'foo',
- '#-- end salt managed zonestart --',
- '#',
- '#-- start salt managed zoneend -- PLEASE, DO NOT EDIT',
- 'bar',
- '#-- end salt managed zoneend --',
- '']
- self.assertEqual([salt.utils.stringutils.to_str(line) for line in expected], contents)
- @with_tempdir()
- @skipIf(salt.utils.platform.is_darwin() and six.PY2, 'This test hangs on OS X on Py2')
- def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
- # causes the Import-Module ServerManager error on Windows
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, 'issue-11003.sls')
- testcase_filedest = os.path.join(base_dir, 'issue-11003.txt')
- sls_template = [
- 'a{0}:',
- ' file.absent:',
- ' - name: {0}',
- '',
- '{0}:',
- ' file.managed:',
- ' - contents: |',
- ' #',
- '',
- 'test-acc1:',
- ' file.accumulated:',
- ' - require_in:',
- ' - file: final',
- ' - filename: {0}',
- ' - text: |',
- ' bar',
- '',
- 'test-acc2:',
- ' file.accumulated:',
- ' - watch_in:',
- ' - file: final',
- ' - filename: {0}',
- ' - text: |',
- ' baz',
- '',
- 'final:',
- ' file.blockreplace:',
- ' - name: {0}',
- ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end managed zone"',
- ' - content: \'\'',
- ' - append_if_not_found: True',
- ' - show_changes: True'
- ]
- with salt.utils.files.fopen(template_path, 'w') as fp_:
- fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function('state.sls', mods='issue-11003', timeout=600)
- for name, step in six.iteritems(ret):
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- begin = contents.index(
- '#-- start managed zone PLEASE, DO NOT EDIT') + 1
- end = contents.index('#-- end managed zone')
- block_contents = contents[begin:end]
- for item in ('', 'bar', 'baz'):
- block_contents.remove(item)
- self.assertEqual(block_contents, [])
- @with_tempdir()
- def test_issue_8947_utf8_sls(self, base_dir):
- '''
- Test some file operation with utf-8 characters on the sls
- This is more generic than just a file test. Feel free to move
- '''
- self.maxDiff = None
- korean_1 = '한국어 시험'
- korean_2 = '첫 번째 행'
- korean_3 = '마지막 행'
- test_file = os.path.join(base_dir, '{0}.txt'.format(korean_1))
- test_file_encoded = test_file
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, 'issue-8947.sls')
- # create the sls template
- template = textwrap.dedent('''\
- some-utf8-file-create:
- file.managed:
- - name: {test_file}
- - contents: {korean_1}
- - makedirs: True
- - replace: True
- - show_diff: True
- some-utf8-file-create2:
- file.managed:
- - name: {test_file}
- - contents: |
- {korean_2}
- {korean_1}
- {korean_3}
- - replace: True
- - show_diff: True
- '''.format(**locals()))
- if not salt.utils.platform.is_windows():
- template += textwrap.dedent('''\
- some-utf8-file-content-test:
- cmd.run:
- - name: 'cat "{test_file}"'
- - require:
- - file: some-utf8-file-create2
- '''.format(**locals()))
- # Save template file
- with salt.utils.files.fopen(template_path, 'wb') as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(template))
- try:
- result = self.run_function('state.sls', mods='issue-8947')
- if not isinstance(result, dict):
- raise AssertionError(
- ('Something went really wrong while testing this sls:'
- ' {0}').format(repr(result))
- )
- # difflib produces different output on python 2.6 than on >=2.7
- if sys.version_info < (2, 7):
- diff = '--- \n+++ \n@@ -1,1 +1,3 @@\n'
- else:
- diff = '--- \n+++ \n@@ -1 +1,3 @@\n'
- diff += (
- '+첫 번째 행{0}'
- ' 한국어 시험{0}'
- '+마지막 행{0}'
- ).format(os.linesep)
- ret = {x.split('_|-')[1]: y for x, y in six.iteritems(result)}
- # Confirm initial creation of file
- self.assertEqual(
- ret['some-utf8-file-create']['comment'],
- 'File {0} updated'.format(test_file_encoded)
- )
- self.assertEqual(
- ret['some-utf8-file-create']['changes'],
- {'diff': 'New file'}
- )
- # Confirm file was modified and that the diff was as expected
- self.assertEqual(
- ret['some-utf8-file-create2']['comment'],
- 'File {0} updated'.format(test_file_encoded)
- )
- self.assertEqual(
- ret['some-utf8-file-create2']['changes'],
- {'diff': diff}
- )
- if salt.utils.platform.is_windows():
- import subprocess
- import win32api
- p = subprocess.Popen(
- salt.utils.stringutils.to_str(
- 'type {}'.format(win32api.GetShortPathName(test_file))),
- shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- p.poll()
- out = p.stdout.read()
- self.assertEqual(
- out.decode('utf-8'),
- os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep
- )
- else:
- self.assertEqual(
- ret['some-utf8-file-content-test']['comment'],
- 'Command "cat "{0}"" run'.format(
- test_file_encoded
- )
- )
- self.assertEqual(
- ret['some-utf8-file-content-test']['changes']['stdout'],
- '\n'.join((korean_2, korean_1, korean_3))
- )
- finally:
- try:
- os.remove(template_path)
- except OSError:
- pass
- @pytest.mark.skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
- on_existing='delete', delete=True)
- @with_tempdir()
- def test_issue_12209_follow_symlinks(self, tempdir, user, group):
- '''
- Ensure that symlinks are properly chowned when recursing (following
- symlinks)
- '''
- # Make the directories for this test
- onedir = os.path.join(tempdir, 'one')
- twodir = os.path.join(tempdir, 'two')
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- 'file.directory', name=tempdir, follow_symlinks=True,
- user=user, group=group, recurse=['user', 'group']
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we are
- # following symlinks, we expect twodir to still be owned by root, but
- # onedir should be owned by the 'issue12209' user.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, 'root')
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- if salt.utils.path.which('id'):
- root_group = self.run_function('user.primary_group', ['root'])
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
- @pytest.mark.skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
- on_existing='delete', delete=True)
- @with_tempdir()
- def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
- '''
- Ensure that symlinks are properly chowned when recursing (not following
- symlinks)
- '''
- # Make the directories for this test
- onedir = os.path.join(tempdir, 'one')
- twodir = os.path.join(tempdir, 'two')
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- 'file.directory', name=tempdir, follow_symlinks=False,
- user=user, group=group, recurse=['user', 'group']
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we
- # are not following symlinks, we expect twodir to now be owned by
- # the 'issue12209' user, just link onedir.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
- @with_tempfile(create=False)
- @with_tempfile()
- def test_template_local_file(self, source, dest):
- '''
- Test a file.managed state with a local file as the source. Test both
- with the file:// protocol designation prepended, and without it.
- '''
- with salt.utils.files.fopen(source, 'w') as fp_:
- fp_.write('{{ foo }}\n')
- for prefix in ('file://', ''):
- ret = self.run_state(
- 'file.managed',
- name=dest,
- source=prefix + source,
- template='jinja',
- context={'foo': 'Hello world!'}
- )
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_template_local_file_noclobber(self, source):
- '''
- Test the case where a source file is in the minion's local filesystem,
- and the source path is the same as the destination path.
- '''
- with salt.utils.files.fopen(source, 'w') as fp_:
- fp_.write('{{ foo }}\n')
- ret = self.run_state(
- 'file.managed',
- name=source,
- source=source,
- template='jinja',
- context={'foo': 'Hello world!'}
- )
- self.assertSaltFalseReturn(ret)
- self.assertIn(
- ('Source file cannot be the same as destination'),
- ret[next(iter(ret))]['comment'],
- )
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_issue_25250_force_copy_deletes(self, source, dest):
- '''
- ensure force option in copy state does not delete target file
- '''
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, 'hosts'), source)
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, 'file/base/cheese'), dest)
- self.run_state('file.copy', name=dest, source=source, force=True)
- self.assertTrue(os.path.exists(dest))
- self.assertTrue(filecmp.cmp(source, dest))
- os.remove(source)
- os.remove(dest)
- @skipIf(IS_WINDOWS, 'Windows does not report any file modes. Skipping.')
- @pytest.mark.destructive_test
- @with_tempfile()
- def test_file_copy_make_dirs(self, source):
- '''
- ensure make_dirs creates correct user perms
- '''
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, 'hosts'), source)
- dest = os.path.join(RUNTIME_VARS.TMP, 'dir1', 'dir2', 'copied_file.txt')
- user = 'salt'
- mode = '0644'
- ret = self.run_function('user.add', [user])
- self.assertTrue(ret, 'Failed to add user. Are you running as sudo?')
- ret = self.run_state('file.copy', name=dest, source=source, user=user,
- makedirs=True, mode=mode)
- self.assertSaltTrueReturn(ret)
- file_checks = [dest, os.path.join(RUNTIME_VARS.TMP, 'dir1'), os.path.join(RUNTIME_VARS.TMP, 'dir1', 'dir2')]
- for check in file_checks:
- user_check = self.run_function('file.get_user', [check])
- mode_check = self.run_function('file.get_mode', [check])
- self.assertEqual(user_check, user)
- self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
- def test_contents_pillar_with_pillar_list(self):
- '''
- This tests for any regressions for this issue:
- https://github.com/saltstack/salt/issues/30934
- '''
- state_file = 'file_contents_pillar'
- ret = self.run_function('state.sls', mods=state_file)
- self.assertSaltTrueReturn(ret)
- @pytest.mark.skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
- on_existing='delete', delete=True)
- def test_owner_after_setuid(self, user, group):
- '''
- Test to check file user/group after setting setuid or setgid.
- Because Python os.chown() does reset the setuid/setgid to 0.
- https://github.com/saltstack/salt/pull/45257
- '''
- # Desired configuration.
- desired = {
- 'file': os.path.join(RUNTIME_VARS.TMP, 'file_with_setuid'),
- 'user': user,
- 'group': group,
- 'mode': '4750'
- }
- # Run the state.
- ret = self.run_state(
- 'file.managed', name=desired['file'],
- user=desired['user'], group=desired['group'], mode=desired['mode']
- )
- # Check result.
- file_stat = os.stat(desired['file'])
- result = {
- 'user': pwd.getpwuid(file_stat.st_uid).pw_name,
- 'group': grp.getgrgid(file_stat.st_gid).gr_name,
- 'mode': oct(stat.S_IMODE(file_stat.st_mode))
- }
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired['user'], result['user'])
- self.assertEqual(desired['group'], result['group'])
- self.assertEqual(desired['mode'], result['mode'].lstrip('0Oo'))
- def test_binary_contents(self):
- '''
- This tests to ensure that binary contents do not cause a traceback.
- '''
- name = os.path.join(RUNTIME_VARS.TMP, '1px.gif')
- try:
- ret = self.run_state(
- 'file.managed',
- name=name,
- contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- finally:
- try:
- os.remove(name)
- except OSError:
- pass
- def test_binary_contents_twice(self):
- '''
- This test ensures that after a binary file is created, salt can confirm
- that the file is in the correct state.
- '''
- # Create a binary file
- name = os.path.join(RUNTIME_VARS.TMP, '1px.gif')
- # First run state ensures file is created
- ret = self.run_state('file.managed', name=name, contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- # Second run of state ensures file is in correct state
- ret = self.run_state('file.managed', name=name, contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- try:
- os.remove(name)
- except OSError:
- pass
- @pytest.mark.skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
- on_existing='delete', delete=True)
- @with_tempdir()
- def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
- '''
- Ensure that mode is correct with changing of ownership and group
- symlinks)
- '''
- tempfile = os.path.join(tempdir, 'temp_file_issue_48336')
- # Run the state
- ret = self.run_state(
- 'file.managed', name=tempfile,
- user=user, group=group, mode='4750',
- )
- self.assertSaltTrueReturn(ret)
- # Check that the owner and group are correct, and
- # the mode is what we expect
- temp_file_stats = os.stat(tempfile)
- # Normalize the mode
- temp_file_mode = six.text_type(oct(stat.S_IMODE(temp_file_stats.st_mode)))
- temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
- self.assertEqual(temp_file_mode, '4750')
- self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
- @with_tempdir()
- def test_issue_48557(self, tempdir):
- tempfile = os.path.join(tempdir, 'temp_file_issue_48557')
- with salt.utils.files.fopen(tempfile, 'wb') as fp:
- fp.write(os.linesep.join([
- 'test1',
- 'test2',
- 'test3',
- '',
- ]).encode('utf-8'))
- ret = self.run_state('file.line',
- name=tempfile,
- after='test2',
- mode='insert',
- content='test4')
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tempfile, 'rb') as fp:
- content = fp.read()
- self.assertEqual(content, os.linesep.join([
- 'test1',
- 'test2',
- 'test4',
- 'test3',
- '',
- ]).encode('utf-8'))
- @with_tempfile()
- def test_issue_50221(self, name):
- expected = 'abc{0}{0}{0}'.format(os.linesep)
- ret = self.run_function(
- 'pillar.get',
- ['issue-50221']
- )
- assert ret == expected
- ret = self.run_function(
- 'state.apply',
- ['issue-50221'],
- pillar={
- 'name': name
- },
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(name, 'r') as fp:
- contents = fp.read()
- assert contents == expected
- def test_managed_file_issue_51208(self):
- '''
- Test to ensure we can handle a file with escaped double-quotes
- '''
- name = os.path.join(RUNTIME_VARS.TMP, 'issue_51208.txt')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://issue-51208/vimrc.stub'
- )
- src = os.path.join(RUNTIME_VARS.BASE_FILES, 'issue-51208', 'vimrc.stub')
- with salt.utils.files.fopen(src, 'r') as fp_:
- master_data = fp_.read()
- with salt.utils.files.fopen(name, 'r') as fp_:
- minion_data = fp_.read()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- @pytest.mark.windows_whitelisted
- class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
- marker_start = '# start'
- marker_end = '# end'
- content = dedent(six.text_type('''\
- Line 1 of block
- Line 2 of block
- '''))
- without_block = dedent(six.text_type('''\
- Hello world!
- # comment here
- '''))
- with_non_matching_block = dedent(six.text_type('''\
- Hello world!
- # start
- No match here
- # end
- # comment here
- '''))
- with_non_matching_block_and_marker_end_not_after_newline = dedent(six.text_type('''\
- Hello world!
- # start
- No match here# end
- # comment here
- '''))
- with_matching_block = dedent(six.text_type('''\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- '''))
- with_matching_block_and_extra_newline = dedent(six.text_type('''\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- '''))
- with_matching_block_and_marker_end_not_after_newline = dedent(six.text_type('''\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block# end
- # comment here
- '''))
- content_explicit_posix_newlines = ('Line 1 of block\n'
- 'Line 2 of block\n')
- content_explicit_windows_newlines = ('Line 1 of block\r\n'
- 'Line 2 of block\r\n')
- without_block_explicit_posix_newlines = ('Hello world!\n\n'
- '# comment here\n')
- without_block_explicit_windows_newlines = ('Hello world!\r\n\r\n'
- '# comment here\r\n')
- with_block_prepended_explicit_posix_newlines = ('# start\n'
- 'Line 1 of block\n'
- 'Line 2 of block\n'
- '# end\n'
- 'Hello world!\n\n'
- '# comment here\n')
- with_block_prepended_explicit_windows_newlines = ('# start\r\n'
- 'Line 1 of block\r\n'
- 'Line 2 of block\r\n'
- '# end\r\n'
- 'Hello world!\r\n\r\n'
- '# comment here\r\n')
- with_block_appended_explicit_posix_newlines = ('Hello world!\n\n'
- '# comment here\n'
- '# start\n'
- 'Line 1 of block\n'
- 'Line 2 of block\n'
- '# end\n')
- with_block_appended_explicit_windows_newlines = ('Hello world!\r\n\r\n'
- '# comment here\r\n'
- '# start\r\n'
- 'Line 1 of block\r\n'
- 'Line 2 of block\r\n'
- '# end\r\n')
- @staticmethod
- def _write(dest, content):
- with salt.utils.files.fopen(dest, 'wb') as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(content))
- @staticmethod
- def _read(src):
- with salt.utils.files.fopen(src, 'rb') as fp_:
- return salt.utils.stringutils.to_unicode(fp_.read())
- @with_tempfile()
- def test_prepend(self, name):
- '''
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file.
- '''
- expected = self.marker_start + os.linesep + self.content + \
- self.marker_end + os.linesep + self.without_block
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_append_newline(self, name):
- '''
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- expected = self.marker_start + os.linesep + self.content + \
- os.linesep + self.marker_end + os.linesep + self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.marker_start + os.linesep + self.content + \
- self.marker_end + os.linesep + self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_no_append_newline(self, name):
- '''
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- expected = self.marker_start + os.linesep + self.content + \
- self.marker_end + os.linesep + self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.marker_start + os.linesep + \
- self.content.rstrip('\r\n') + self.marker_end + os.linesep + \
- self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append(self, name):
- '''
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file.
- '''
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + self.marker_end + os.linesep
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_append_newline(self, name):
- '''
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + os.linesep + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_no_append_newline(self, name):
- '''
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content.rstrip('\r\n') + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_auto_line_separator(self, name):
- '''
- This tests the line separator auto-detection when prepending the block
- '''
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_windows_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_windows_newlines)
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_posix_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_posix_newlines)
- @with_tempfile()
- def test_append_auto_line_separator(self, name):
- '''
- This tests the line separator auto-detection when appending the block
- '''
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_windows_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_windows_newlines)
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_posix_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_posix_newlines)
- @with_tempfile()
- def test_non_matching_block(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_no_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match, and the marker_end is not directly preceded by a newline.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_matching_block(self, name):
- '''
- Test blockreplace when block exists and its contents are a match. No
- changes should be made.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to True. This will result in an
- extra newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_no_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to False. This will result in the
- marker_end not being directly preceded by a newline when the content
- does not end in a newline.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True. This will result in an extra
- newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_issue_49043(self, name):
- ret = self.run_function(
- 'state.sls',
- mods='issue-49043',
- pillar={'name': name},
- )
- log.error("ret = %s", repr(ret))
- diff = '--- \n+++ \n@@ -0,0 +1,3 @@\n'
- diff += dedent('''\
- +#-- start managed zone --
- +äöü
- +#-- end managed zone --
- ''')
- job = 'file_|-somefile-blockreplace_|-{}_|-blockreplace'.format(name)
- self.assertEqual(
- ret[job]['changes']['diff'],
- diff)
- @pytest.mark.windows_whitelisted
- class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
- '''
- Uses a local tornado webserver to test http(s) file.managed states with and
- without skip_verify
- '''
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.source = cls.webserver.url('grail/scene33')
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- cls.source_hash = '21438b3d5fd2c0028bcab92f7824dc69'
- else:
- cls.source_hash = 'd2feb3beb323c79fc7a0f44f1408b4a3'
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- @with_tempfile(create=False)
- def setUp(self, name): # pylint: disable=arguments-differ
- self.name = name
- def tearDown(self):
- try:
- os.remove(self.name)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- six.reraise(*sys.exc_info())
- def run_state(self, *args, **kwargs):
- ret = super(RemoteFileTest, self).run_state(*args, **kwargs)
- log.debug('ret = %s', ret)
- return ret
- def test_file_managed_http_source_no_hash(self):
- '''
- Test a remote file with no hash
- '''
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- skip_verify=False)
- # This should fail because no hash was provided
- self.assertSaltFalseReturn(ret)
- def test_file_managed_http_source(self):
- '''
- Test a remote file with no hash
- '''
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- skip_verify=False)
- self.assertSaltTrueReturn(ret)
- def test_file_managed_http_source_skip_verify(self):
- '''
- Test a remote file using skip_verify
- '''
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- skip_verify=True)
- self.assertSaltTrueReturn(ret)
- def test_file_managed_keep_source_false_http(self):
- '''
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using an http:// URL
- '''
- # Run the state
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- keep_source=False)
- ret = ret[next(iter(ret))]
- assert ret['result'] is True
- # Now make sure that the file is not cached
- result = self.run_function('cp.is_cached', [self.source])
- assert result == '', 'File is still cached at {0}'.format(result)
- @skipIf(not salt.utils.path.which('patch'), 'patch is not installed')
- @pytest.mark.windows_whitelisted
- class PatchTest(ModuleCase, SaltReturnAssertsMixin):
- def _check_patch_version(self, min_version):
- '''
- patch version check
- '''
- version = self.run_function('cmd.run', ['patch --version']).splitlines()[0]
- version = version.split()[1]
- if _LooseVersion(version) < _LooseVersion(min_version):
- self.skipTest('Minimum patch version required: {0}. '
- 'Patch version installed: {1}'.format(min_version, version))
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.numbers_patch_name = 'numbers.patch'
- cls.math_patch_name = 'math.patch'
- cls.all_patch_name = 'all.patch'
- cls.numbers_patch_template_name = cls.numbers_patch_name + '.jinja'
- cls.math_patch_template_name = cls.math_patch_name + '.jinja'
- cls.all_patch_template_name = cls.all_patch_name + '.jinja'
- cls.numbers_patch_path = 'patches/' + cls.numbers_patch_name
- cls.math_patch_path = 'patches/' + cls.math_patch_name
- cls.all_patch_path = 'patches/' + cls.all_patch_name
- cls.numbers_patch_template_path = \
- 'patches/' + cls.numbers_patch_template_name
- cls.math_patch_template_path = \
- 'patches/' + cls.math_patch_template_name
- cls.all_patch_template_path = \
- 'patches/' + cls.all_patch_template_name
- cls.numbers_patch = 'salt://' + cls.numbers_patch_path
- cls.math_patch = 'salt://' + cls.math_patch_path
- cls.all_patch = 'salt://' + cls.all_patch_path
- cls.numbers_patch_template = 'salt://' + cls.numbers_patch_template_path
- cls.math_patch_template = 'salt://' + cls.math_patch_template_path
- cls.all_patch_template = 'salt://' + cls.all_patch_template_path
- cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
- cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
- cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
- cls.numbers_patch_template_http = \
- cls.webserver.url(cls.numbers_patch_template_path)
- cls.math_patch_template_http = \
- cls.webserver.url(cls.math_patch_template_path)
- cls.all_patch_template_http = \
- cls.webserver.url(cls.all_patch_template_path)
- patches_dir = os.path.join(RUNTIME_VARS.FILES, 'file', 'base', 'patches')
- cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_name)
- )
- cls.math_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_name)
- )
- cls.all_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_name)
- )
- cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_template_name)
- )
- cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_template_name)
- )
- cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_template_name)
- )
- cls.context = {'two': 'two', 'ten': 10}
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- def setUp(self):
- '''
- Create a new unpatched set of files
- '''
- self.base_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- os.makedirs(os.path.join(self.base_dir, 'foo', 'bar'))
- self.numbers_file = os.path.join(self.base_dir, 'foo', 'numbers.txt')
- self.math_file = os.path.join(self.base_dir, 'foo', 'bar', 'math.txt')
- with salt.utils.files.fopen(self.numbers_file, 'w') as fp_:
- fp_.write(textwrap.dedent('''\
- one
- two
- three
- 1
- 2
- 3
- '''))
- with salt.utils.files.fopen(self.math_file, 'w') as fp_:
- fp_.write(textwrap.dedent('''\
- Five plus five is ten
- Four squared is sixteen
- '''))
- self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
- def test_patch_single_file(self):
- '''
- Test file.patch using a patch applied to a single file
- '''
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- '''
- self._check_patch_version('2.6')
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_strip_parsing(self):
- '''
- Test that we successfuly parse -p/--strip when included in the options
- '''
- self._check_patch_version('2.6')
- # Run the state using -p1
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- options='-p1',
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state using --strip=1
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- options='--strip=1',
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- # Re-run the state using --strip 1
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- options='--strip 1',
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_saltenv(self):
- '''
- Test that we attempt to download the patch from a non-base saltenv
- '''
- # This state will fail because we don't have a patch file in that
- # environment, but that is OK, we just want to test that we're looking
- # in an environment other than base.
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch,
- saltenv='prod',
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(
- ret['comment'],
- "Source file {0} not found in saltenv 'prod'".format(self.math_patch)
- )
- def test_patch_single_file_failure(self):
- '''
- Test file.patch using a patch applied to a single file. This tests a
- failed patch.
- '''
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, 'w'):
- pass
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- self.assertIn(
- 'saving rejects to file {0}'.format(reject_file),
- ret['comment']
- )
- def test_patch_directory_failure(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- '''
- # Empty the file to ensure that the patch doesn't apply
- with salt.utils.files.fopen(self.math_file, 'w'):
- pass
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- self.assertIn(
- 'saving rejects to file {0}'.format(reject_file),
- ret['comment']
- )
- def test_patch_single_file_remote_source(self):
- '''
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- '''
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_http,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_http,
- source_hash=self.math_patch_hash,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_http,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory_remote_source(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- '''
- self._check_patch_version('2.6')
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_http,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_http,
- source_hash=self.all_patch_hash,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_http,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_single_file_template(self):
- '''
- Test file.patch using a patch applied to a single file, with jinja
- templating applied to the patch file.
- '''
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template='jinja',
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template='jinja',
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory_template(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and with jinja templating applied to the patch
- file.
- '''
- self._check_patch_version('2.6')
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_single_file_remote_source_template(self):
- '''
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- '''
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_template_http,
- template='jinja',
- context=self.context,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_template_http,
- source_hash=self.math_patch_template_hash,
- template='jinja',
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_template_http,
- template='jinja',
- context=self.context,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory_remote_source_template(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- '''
- self._check_patch_version('2.6')
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template_http,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template_http,
- source_hash=self.all_patch_template_hash,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template_http,
- template='jinja',
- context=self.context,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_test_mode(self):
- '''
- Test file.patch using test=True
- '''
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- test=True,
- )
- self.assertSaltNoneReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'The patch would be applied')
- self.assertTrue(ret['changes'])
- # Apply the patch for real. We'll then be able to test below that we
- # exit with a True rather than a None result if test=True is used on an
- # already-applied patch.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- self.assertTrue(ret['changes'])
- # Run again with test=True. Since the pre-check happens before we do
- # the __opts__['test'] check, we should exit with a True result just
- # the same as if we try to run this state on an already-patched file
- # *without* test=True.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- test=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, 'w'):
- pass
- # Run again with test=True. Similar to the above run, we are testing
- # that we return before we reach the __opts__['test'] check. In this
- # case we should return a False result because we should already know
- # by this point that the patch will not apply cleanly.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- test=True,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- self.assertEqual(ret['changes'], {})
- WIN_TEST_FILE = 'c:/testfile'
- @pytest.mark.destructive_test
- @skipIf(not IS_WINDOWS, 'windows test only')
- @pytest.mark.windows_whitelisted
- class WinFileTest(ModuleCase):
- '''
- Test for the file state on Windows
- '''
- def setUp(self):
- self.run_state('file.managed', name=WIN_TEST_FILE, makedirs=True, contents='Only a test')
- def tearDown(self):
- self.run_state('file.absent', name=WIN_TEST_FILE)
- def test_file_managed(self):
- '''
- Test file.managed on Windows
- '''
- self.assertTrue(self.run_state('file.exists', name=WIN_TEST_FILE))
- def test_file_copy(self):
- '''
- Test file.copy on Windows
- '''
- ret = self.run_state('file.copy', name='c:/testfile_copy', makedirs=True, source=WIN_TEST_FILE)
- self.assertTrue(ret)
- def test_file_comment(self):
- '''
- Test file.comment on Windows
- '''
- self.run_state('file.comment', name=WIN_TEST_FILE, regex='^Only')
- with salt.utils.files.fopen(WIN_TEST_FILE, 'r') as fp_:
- self.assertTrue(fp_.read().startswith('#Only'))
- def test_file_replace(self):
- '''
- Test file.replace on Windows
- '''
- self.run_state('file.replace', name=WIN_TEST_FILE, pattern='test', repl='testing')
- with salt.utils.files.fopen(WIN_TEST_FILE, 'r') as fp_:
- self.assertIn('testing', fp_.read())
- def test_file_absent(self):
- '''
- Test file.absent on Windows
- '''
- ret = self.run_state('file.absent', name=WIN_TEST_FILE)
- self.assertTrue(ret)
|